package engine

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/prometheus/client_golang/prometheus/promhttp"

	"github.com/hatchet-dev/hatchet/internal/services/admin"
	adminv1 "github.com/hatchet-dev/hatchet/internal/services/admin/v1"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/events"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/jobs"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/retention"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/v1/olap"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/v1/task"
	"github.com/hatchet-dev/hatchet/internal/services/controllers/workflows"
	"github.com/hatchet-dev/hatchet/internal/services/dispatcher"
	dispatcherv1 "github.com/hatchet-dev/hatchet/internal/services/dispatcher/v1"
	"github.com/hatchet-dev/hatchet/internal/services/grpc"
	"github.com/hatchet-dev/hatchet/internal/services/health"
	"github.com/hatchet-dev/hatchet/internal/services/ingestor"
	"github.com/hatchet-dev/hatchet/internal/services/partition"
	"github.com/hatchet-dev/hatchet/internal/services/scheduler"
	schedulerv1 "github.com/hatchet-dev/hatchet/internal/services/scheduler/v1"
	"github.com/hatchet-dev/hatchet/internal/services/ticker"
	"github.com/hatchet-dev/hatchet/internal/services/webhooks"
	"github.com/hatchet-dev/hatchet/pkg/config/loader"
	"github.com/hatchet-dev/hatchet/pkg/config/server"
	"github.com/hatchet-dev/hatchet/pkg/config/shared"
	"github.com/hatchet-dev/hatchet/pkg/repository/cache"
	v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
	"github.com/hatchet-dev/hatchet/pkg/telemetry"
	"github.com/rs/zerolog"

	"golang.org/x/sync/errgroup"
)

type Teardown struct {
	Name string
	Fn   func() error
}

func init() {
	svcName := os.Getenv("SERVER_OTEL_SERVICE_NAME")
	collectorURL := os.Getenv("SERVER_OTEL_COLLECTOR_URL")
	insecure := os.Getenv("SERVER_OTEL_INSECURE")
	traceIdRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO")
	collectorAuth := os.Getenv("SERVER_OTEL_COLLECTOR_AUTH")

	var insecureBool bool

	if insecureStr := strings.ToLower(strings.TrimSpace(insecure)); insecureStr == "t" || insecureStr == "true" {
		insecureBool = true
	}

	// we do this to we get the tracer set globally, which is needed by some of the otel
	// integrations for the database before start
	_, err := telemetry.InitTracer(&telemetry.TracerOpts{
		ServiceName:   svcName,
		CollectorURL:  collectorURL,
		TraceIdRatio:  traceIdRatio,
		Insecure:      insecureBool,
		CollectorAuth: collectorAuth,
	})

	if err != nil {
		panic(fmt.Errorf("could not initialize tracer: %w", err))
	}
}

func Run(ctx context.Context, cf *loader.ConfigLoader, version string) error {
	serverCleanup, server, err := cf.CreateServerFromConfig(version)
	if err != nil {
		return fmt.Errorf("could not load server config: %w", err)
	}

	var l = server.Logger

	teardown, err := RunWithConfig(ctx, server)

	if err != nil {
		return fmt.Errorf("could not run with config: %w", err)
	}

	teardown = append(teardown, Teardown{
		Name: "server",
		Fn: func() error {
			return serverCleanup()
		},
	})

	teardown = append(teardown, Teardown{
		Name: "database",
		Fn: func() error {
			return server.Disconnect()
		},
	})

	time.Sleep(server.Runtime.ShutdownWait)

	l.Debug().Msgf("interrupt received, shutting down")

	l.Debug().Msgf("waiting for all other services to gracefully exit...")
	for i, t := range teardown {
		l.Debug().Msgf("shutting down %s (%d/%d)", t.Name, i+1, len(teardown))
		err := t.Fn()

		if err != nil {
			return fmt.Errorf("could not teardown %s: %w", t.Name, err)
		}
		l.Debug().Msgf("successfully shutdown %s (%d/%d)", t.Name, i+1, len(teardown))
	}
	l.Debug().Msgf("all services have successfully gracefully exited")

	l.Debug().Msgf("successfully shutdown")

	return nil
}

func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, error) {
	isV1 := sc.HasService("all") || sc.HasService("scheduler") || sc.HasService("controllers") || sc.HasService("grpc-api")

	if isV1 {
		return runV1Config(ctx, sc)
	}

	return runV0Config(ctx, sc)
}

func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, error) {
	var l = sc.Logger

	shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
		ServiceName:   sc.OpenTelemetry.ServiceName,
		CollectorURL:  sc.OpenTelemetry.CollectorURL,
		TraceIdRatio:  sc.OpenTelemetry.TraceIdRatio,
		Insecure:      sc.OpenTelemetry.Insecure,
		CollectorAuth: sc.OpenTelemetry.CollectorAuth,
	})
	if err != nil {
		return nil, fmt.Errorf("could not initialize tracer: %w", err)
	}

	teardown := []Teardown{}

	if sc.Prometheus.Enabled {
		teardown = append(teardown, startPrometheus(l, sc.Prometheus))
	}

	p, err := partition.NewPartition(l, sc.EngineRepository.Tenant())

	if err != nil {
		return nil, fmt.Errorf("could not create partitioner: %w", err)
	}

	teardown = append(teardown, Teardown{
		Name: "partitioner",
		Fn:   p.Shutdown,
	})

	var h *health.Health
	healthProbes := sc.HasService("health")
	if healthProbes {
		h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version, l)
		cleanup, err := h.Start(sc.Runtime.HealthcheckPort)
		if err != nil {
			return nil, fmt.Errorf("could not start health: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "health",
			Fn:   cleanup,
		})
	}

	if sc.HasService("eventscontroller") {
		ec, err := events.New(
			events.WithMessageQueue(sc.MessageQueue),
			events.WithRepository(sc.EngineRepository),
			events.WithLogger(sc.Logger),
			events.WithEntitlementsRepository(sc.EntitlementRepository),
		)
		if err != nil {
			return nil, fmt.Errorf("could not create events controller: %w", err)
		}

		cleanup, err := ec.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start events controller: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "events controller",
			Fn:   cleanup,
		})
	}

	// FIXME: jobscontroller and workflowscontroller are deprecated service names, but there's not a clear upgrade
	// path for old config files.
	if sc.HasService("queue") || sc.HasService("jobscontroller") || sc.HasService("workflowscontroller") || sc.HasService("retention") || sc.HasService("ticker") {
		partitionCleanup, err := p.StartControllerPartition(ctx)
		if err != nil {
			return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "controller partition",
			Fn:   partitionCleanup,
		})

		schedulePartitionCleanup, err := p.StartSchedulerPartition(ctx)

		if err != nil {
			return nil, fmt.Errorf("could not create create scheduler partition: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "scheduler partition",
			Fn:   schedulePartitionCleanup,
		})

		// create the dispatcher
		s, err := scheduler.New(
			scheduler.WithAlerter(sc.Alerter),
			scheduler.WithMessageQueue(sc.MessageQueue),
			scheduler.WithRepository(sc.EngineRepository),
			scheduler.WithLogger(sc.Logger),
			scheduler.WithPartition(p),
			scheduler.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			scheduler.WithSchedulerPool(sc.SchedulingPool),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher: %w", err)
		}

		cleanup, err := s.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start dispatcher: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "scheduler",
			Fn:   cleanup,
		})

		sv1, err := schedulerv1.New(
			schedulerv1.WithAlerter(sc.Alerter),
			schedulerv1.WithMessageQueue(sc.MessageQueueV1),
			schedulerv1.WithRepository(sc.EngineRepository),
			schedulerv1.WithV2Repository(sc.V1),
			schedulerv1.WithLogger(sc.Logger),
			schedulerv1.WithPartition(p),
			schedulerv1.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			schedulerv1.WithSchedulerPool(sc.SchedulingPoolV1),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create scheduler (v1): %w", err)
		}

		cleanup, err = sv1.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start scheduler (v1): %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "schedulerv1",
			Fn:   cleanup,
		})
	}

	if sc.HasService("ticker") {
		t, err := ticker.New(
			ticker.WithMessageQueue(sc.MessageQueue),
			ticker.WithMessageQueueV1(sc.MessageQueueV1),
			ticker.WithRepository(sc.EngineRepository),
			ticker.WithRepositoryV1(sc.V1),
			ticker.WithLogger(sc.Logger),
			ticker.WithTenantAlerter(sc.TenantAlerter),
			ticker.WithEntitlementsRepository(sc.EntitlementRepository),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create ticker: %w", err)
		}

		cleanup, err := t.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start ticker: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "ticker",
			Fn:   cleanup,
		})
	}

	if sc.HasService("queue") || sc.HasService("jobscontroller") || sc.HasService("workflowscontroller") {
		jc, err := jobs.New(
			jobs.WithAlerter(sc.Alerter),
			jobs.WithMessageQueue(sc.MessageQueue),
			jobs.WithRepository(sc.EngineRepository),
			jobs.WithLogger(sc.Logger),
			jobs.WithPartition(p),
			jobs.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			jobs.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create jobs controller: %w", err)
		}

		cleanupJobs, err := jc.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start jobs controller: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "jobs controller",
			Fn:   cleanupJobs,
		})

		wc, err := workflows.New(
			workflows.WithAlerter(sc.Alerter),
			workflows.WithMessageQueue(sc.MessageQueue),
			workflows.WithRepository(sc.EngineRepository),
			workflows.WithLogger(sc.Logger),
			workflows.WithTenantAlerter(sc.TenantAlerter),
			workflows.WithPartition(p),
		)
		if err != nil {
			return nil, fmt.Errorf("could not create workflows controller: %w", err)
		}

		cleanupWorkflows, err := wc.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start workflows controller: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "workflows controller",
			Fn:   cleanupWorkflows,
		})

		tasks, err := task.New(
			task.WithAlerter(sc.Alerter),
			task.WithMessageQueue(sc.MessageQueueV1),
			task.WithRepository(sc.EngineRepository),
			task.WithV1Repository(sc.V1),
			task.WithLogger(sc.Logger),
			task.WithPartition(p),
			task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
			task.WithAnalyzeCronInterval(sc.CronOperations.TaskAnalyzeCronInterval),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create tasks controller: %w", err)
		}

		cleanupTasks, err := tasks.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start tasks controller: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "tasks controller",
			Fn:   cleanupTasks,
		})

		sizeLimits := v1.StatusUpdateBatchSizeLimits{
			Task: int32(sc.OLAPStatusUpdates.TaskBatchSizeLimit),
			DAG:  int32(sc.OLAPStatusUpdates.DagBatchSizeLimit),
		}

		olap, err := olap.New(
			olap.WithAlerter(sc.Alerter),
			olap.WithMessageQueue(sc.MessageQueueV1),
			olap.WithRepository(sc.V1),
			olap.WithLogger(sc.Logger),
			olap.WithPartition(p),
			olap.WithTenantAlertManager(sc.TenantAlerter),
			olap.WithSamplingConfig(sc.Sampling),
			olap.WithOperationsConfig(sc.Operations),
			olap.WithAnalyzeCronInterval(sc.CronOperations.OLAPAnalyzeCronInterval),
			olap.WithOLAPStatusUpdateBatchSizeLimits(sizeLimits),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create olap controller: %w", err)
		}

		cleanupOlap, err := olap.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start olap controller: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "olap controller",
			Fn:   cleanupOlap,
		})
	}

	if sc.HasService("retention") {
		rc, err := retention.New(
			retention.WithAlerter(sc.Alerter),
			retention.WithRepository(sc.EngineRepository),
			retention.WithLogger(sc.Logger),
			retention.WithTenantAlerter(sc.TenantAlerter),
			retention.WithPartition(p),
			retention.WithDataRetention(sc.EnableDataRetention),
			retention.WithWorkerRetention(sc.EnableWorkerRetention),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create retention controller: %w", err)
		}

		cleanupRetention, err := rc.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start retention controller: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "retention controller",
			Fn:   cleanupRetention,
		})
	}

	if sc.HasService("grpc") {
		cacheInstance := cache.New(10 * time.Second)

		// create the dispatcher
		d, err := dispatcher.New(
			dispatcher.WithAlerter(sc.Alerter),
			dispatcher.WithMessageQueue(sc.MessageQueue),
			dispatcher.WithMessageQueueV1(sc.MessageQueueV1),
			dispatcher.WithRepository(sc.EngineRepository),
			dispatcher.WithRepositoryV1(sc.V1),
			dispatcher.WithLogger(sc.Logger),
			dispatcher.WithEntitlementsRepository(sc.EntitlementRepository),
			dispatcher.WithCache(cacheInstance),
			dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
			dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher: %w", err)
		}

		dispatcherCleanup, err := d.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start dispatcher: %w", err)
		}

		dv1, err := dispatcherv1.NewDispatcherService(
			dispatcherv1.WithRepository(sc.V1),
			dispatcherv1.WithMessageQueue(sc.MessageQueueV1),
			dispatcherv1.WithLogger(sc.Logger),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher (v1): %w", err)
		}

		// create the event ingestor
		ei, err := ingestor.NewIngestor(
			ingestor.WithEventRepository(
				sc.EngineRepository.Event(),
			),
			ingestor.WithStreamEventsRepository(
				sc.EngineRepository.StreamEvent(),
			),
			ingestor.WithLogRepository(
				sc.EngineRepository.Log(),
			),
			ingestor.WithMessageQueue(sc.MessageQueue),
			ingestor.WithMessageQueueV1(sc.MessageQueueV1),
			ingestor.WithEntitlementsRepository(sc.EntitlementRepository),
			ingestor.WithStepRunRepository(sc.EngineRepository.StepRun()),
			ingestor.WithRepositoryV1(sc.V1),
			ingestor.WithLogIngestionEnabled(sc.Runtime.LogIngestionEnabled),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create ingestor: %w", err)
		}

		adminSvc, err := admin.NewAdminService(
			admin.WithRepository(sc.EngineRepository),
			admin.WithRepositoryV1(sc.V1),
			admin.WithMessageQueue(sc.MessageQueue),
			admin.WithMessageQueueV1(sc.MessageQueueV1),
			admin.WithEntitlementsRepository(sc.EntitlementRepository),
		)
		if err != nil {
			return nil, fmt.Errorf("could not create admin service: %w", err)
		}

		adminv1Svc, err := adminv1.NewAdminService(
			adminv1.WithRepository(sc.V1),
			adminv1.WithMessageQueue(sc.MessageQueueV1),
			adminv1.WithEntitlementsRepository(sc.EntitlementRepository),
			adminv1.WithAnalytics(sc.Analytics),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create admin service (v1): %w", err)
		}

		grpcOpts := []grpc.ServerOpt{
			grpc.WithConfig(sc),
			grpc.WithIngestor(ei),
			grpc.WithDispatcher(d),
			grpc.WithDispatcherV1(dv1),
			grpc.WithAdmin(adminSvc),
			grpc.WithAdminV1(adminv1Svc),
			grpc.WithLogger(sc.Logger),
			grpc.WithAlerter(sc.Alerter),
			grpc.WithTLSConfig(sc.TLSConfig),
			grpc.WithPort(sc.Runtime.GRPCPort),
			grpc.WithBindAddress(sc.Runtime.GRPCBindAddress),
		}

		if sc.Runtime.GRPCInsecure {
			grpcOpts = append(grpcOpts, grpc.WithInsecure())
		}

		// create the grpc server
		s, err := grpc.NewServer(
			grpcOpts...,
		)
		if err != nil {
			return nil, fmt.Errorf("could not create grpc server: %w", err)
		}

		grpcServerCleanup, err := s.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start grpc server: %w", err)
		}

		cleanup := func() error {
			g := new(errgroup.Group)

			g.Go(func() error {
				err := dispatcherCleanup()
				if err != nil {
					return fmt.Errorf("failed to cleanup dispatcher: %w", err)
				}

				cacheInstance.Stop()
				return nil
			})

			g.Go(func() error {
				err := grpcServerCleanup()
				if err != nil {
					return fmt.Errorf("failed to cleanup GRPC server: %w", err)
				}
				return nil
			})

			if err := g.Wait(); err != nil {
				return fmt.Errorf("could not teardown grpc dispatcher: %w", err)
			}

			return nil
		}

		teardown = append(teardown, Teardown{
			Name: "grpc",
			Fn:   cleanup,
		})
	}

	if sc.HasService("webhookscontroller") {
		cleanup1, err := p.StartTenantWorkerPartition(ctx)

		if err != nil {
			return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "tenant worker partition",
			Fn:   cleanup1,
		})

		wh := webhooks.New(sc, p, l)

		cleanup2, err := wh.Start()
		if err != nil {
			return nil, fmt.Errorf("could not create webhook worker: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "webhook worker",
			Fn:   cleanup2,
		})
	}

	teardown = append(teardown, Teardown{
		Name: "telemetry",
		Fn: func() error {
			return shutdown(ctx)
		},
	})

	l.Debug().Msgf("engine has started")

	<-ctx.Done()

	if healthProbes {
		h.SetShuttingDown(true)
	}

	return teardown, nil
}

func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, error) {
	var l = sc.Logger

	shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
		ServiceName:   sc.OpenTelemetry.ServiceName,
		CollectorURL:  sc.OpenTelemetry.CollectorURL,
		TraceIdRatio:  sc.OpenTelemetry.TraceIdRatio,
		Insecure:      sc.OpenTelemetry.Insecure,
		CollectorAuth: sc.OpenTelemetry.CollectorAuth,
	})
	if err != nil {
		return nil, fmt.Errorf("could not initialize tracer: %w", err)
	}

	teardown := []Teardown{}

	if sc.Prometheus.Enabled {
		teardown = append(teardown, startPrometheus(l, sc.Prometheus))
	}

	p, err := partition.NewPartition(l, sc.EngineRepository.Tenant())

	if err != nil {
		return nil, fmt.Errorf("could not create partitioner: %w", err)
	}

	teardown = append(teardown, Teardown{
		Name: "partitioner",
		Fn:   p.Shutdown,
	})

	healthProbes := sc.Runtime.Healthcheck
	var h *health.Health

	if healthProbes {
		h = health.New(sc.EngineRepository, sc.MessageQueue, sc.Version, l)

		cleanup, err := h.Start(sc.Runtime.HealthcheckPort)

		if err != nil {
			return nil, fmt.Errorf("could not start health: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "health",
			Fn:   cleanup,
		})
	}

	if sc.HasService("all") || sc.HasService("controllers") {
		partitionCleanup, err := p.StartControllerPartition(ctx)

		if err != nil {
			return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "controller partition",
			Fn:   partitionCleanup,
		})

		ec, err := events.New(
			events.WithMessageQueue(sc.MessageQueue),
			events.WithRepository(sc.EngineRepository),
			events.WithLogger(sc.Logger),
			events.WithEntitlementsRepository(sc.EntitlementRepository),
		)
		if err != nil {
			return nil, fmt.Errorf("could not create events controller: %w", err)
		}

		cleanup, err := ec.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start events controller: %w", err)
		}
		teardown = append(teardown, Teardown{
			Name: "events controller",
			Fn:   cleanup,
		})

		t, err := ticker.New(
			ticker.WithMessageQueue(sc.MessageQueue),
			ticker.WithMessageQueueV1(sc.MessageQueueV1),
			ticker.WithRepository(sc.EngineRepository),
			ticker.WithRepositoryV1(sc.V1),
			ticker.WithLogger(sc.Logger),
			ticker.WithTenantAlerter(sc.TenantAlerter),
			ticker.WithEntitlementsRepository(sc.EntitlementRepository),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create ticker: %w", err)
		}

		cleanup, err = t.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start ticker: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "ticker",
			Fn:   cleanup,
		})

		jc, err := jobs.New(
			jobs.WithAlerter(sc.Alerter),
			jobs.WithMessageQueue(sc.MessageQueue),
			jobs.WithRepository(sc.EngineRepository),
			jobs.WithLogger(sc.Logger),
			jobs.WithPartition(p),
			jobs.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			jobs.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create jobs controller: %w", err)
		}

		cleanupJobs, err := jc.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start jobs controller: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "jobs controller",
			Fn:   cleanupJobs,
		})

		wc, err := workflows.New(
			workflows.WithAlerter(sc.Alerter),
			workflows.WithMessageQueue(sc.MessageQueue),
			workflows.WithRepository(sc.EngineRepository),
			workflows.WithLogger(sc.Logger),
			workflows.WithTenantAlerter(sc.TenantAlerter),
			workflows.WithPartition(p),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create workflows controller: %w", err)
		}

		cleanupWorkflows, err := wc.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start workflows controller: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "workflows controller",
			Fn:   cleanupWorkflows,
		})

		rc, err := retention.New(
			retention.WithAlerter(sc.Alerter),
			retention.WithRepository(sc.EngineRepository),
			retention.WithLogger(sc.Logger),
			retention.WithTenantAlerter(sc.TenantAlerter),
			retention.WithPartition(p),
			retention.WithDataRetention(sc.EnableDataRetention),
			retention.WithWorkerRetention(sc.EnableWorkerRetention),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create retention controller: %w", err)
		}

		cleanupRetention, err := rc.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start retention controller: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "retention controller",
			Fn:   cleanupRetention,
		})

		if isControllerActive(sc.PausedControllers, TaskController) {
			tasks, err := task.New(
				task.WithAlerter(sc.Alerter),
				task.WithMessageQueue(sc.MessageQueueV1),
				task.WithRepository(sc.EngineRepository),
				task.WithV1Repository(sc.V1),
				task.WithLogger(sc.Logger),
				task.WithPartition(p),
				task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
				task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats),
				task.WithOpsPoolJitter(sc.Operations),
				task.WithReplayEnabled(sc.Runtime.ReplayEnabled),
				task.WithAnalyzeCronInterval(sc.CronOperations.TaskAnalyzeCronInterval),
			)

			if err != nil {
				return nil, fmt.Errorf("could not create tasks controller: %w", err)
			}

			cleanupTasks, err := tasks.Start()

			if err != nil {
				return nil, fmt.Errorf("could not start tasks controller: %w", err)
			}

			teardown = append(teardown, Teardown{
				Name: "tasks controller",
				Fn:   cleanupTasks,
			})
		}

		if isControllerActive(sc.PausedControllers, OLAPController) {
			sizeLimits := v1.StatusUpdateBatchSizeLimits{
				Task: int32(sc.OLAPStatusUpdates.TaskBatchSizeLimit),
				DAG:  int32(sc.OLAPStatusUpdates.DagBatchSizeLimit),
			}

			olap, err := olap.New(
				olap.WithAlerter(sc.Alerter),
				olap.WithMessageQueue(sc.MessageQueueV1),
				olap.WithRepository(sc.V1),
				olap.WithLogger(sc.Logger),
				olap.WithPartition(p),
				olap.WithTenantAlertManager(sc.TenantAlerter),
				olap.WithSamplingConfig(sc.Sampling),
				olap.WithOperationsConfig(sc.Operations),
				olap.WithPrometheusMetricsEnabled(sc.Prometheus.Enabled),
				olap.WithAnalyzeCronInterval(sc.CronOperations.OLAPAnalyzeCronInterval),
				olap.WithOLAPStatusUpdateBatchSizeLimits(sizeLimits),
			)

			if err != nil {
				return nil, fmt.Errorf("could not create olap controller: %w", err)
			}

			cleanupOlap, err := olap.Start()

			if err != nil {
				return nil, fmt.Errorf("could not start olap controller: %w", err)
			}

			teardown = append(teardown, Teardown{
				Name: "olap controller",
				Fn:   cleanupOlap,
			})
		}

		cleanup1, err := p.StartTenantWorkerPartition(ctx)

		if err != nil {
			return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "tenant worker partition",
			Fn:   cleanup1,
		})

		wh := webhooks.New(sc, p, l)

		cleanup2, err := wh.Start()

		if err != nil {
			return nil, fmt.Errorf("could not create webhook worker: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "webhook worker",
			Fn:   cleanup2,
		})
	}

	if sc.HasService("all") || sc.HasService("grpc-api") {
		cacheInstance := cache.New(10 * time.Second)

		// create the dispatcher
		d, err := dispatcher.New(
			dispatcher.WithAlerter(sc.Alerter),
			dispatcher.WithMessageQueue(sc.MessageQueue),
			dispatcher.WithMessageQueueV1(sc.MessageQueueV1),
			dispatcher.WithRepository(sc.EngineRepository),
			dispatcher.WithRepositoryV1(sc.V1),
			dispatcher.WithLogger(sc.Logger),
			dispatcher.WithEntitlementsRepository(sc.EntitlementRepository),
			dispatcher.WithCache(cacheInstance),
			dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
			dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher: %w", err)
		}

		dispatcherCleanup, err := d.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start dispatcher: %w", err)
		}

		dv1, err := dispatcherv1.NewDispatcherService(
			dispatcherv1.WithRepository(sc.V1),
			dispatcherv1.WithMessageQueue(sc.MessageQueueV1),
			dispatcherv1.WithLogger(sc.Logger),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher (v1): %w", err)
		}

		// create the event ingestor
		ei, err := ingestor.NewIngestor(
			ingestor.WithEventRepository(
				sc.EngineRepository.Event(),
			),
			ingestor.WithStreamEventsRepository(
				sc.EngineRepository.StreamEvent(),
			),
			ingestor.WithLogRepository(
				sc.EngineRepository.Log(),
			),
			ingestor.WithMessageQueue(sc.MessageQueue),
			ingestor.WithMessageQueueV1(sc.MessageQueueV1),
			ingestor.WithEntitlementsRepository(sc.EntitlementRepository),
			ingestor.WithStepRunRepository(sc.EngineRepository.StepRun()),
			ingestor.WithRepositoryV1(sc.V1),
			ingestor.WithLogIngestionEnabled(sc.Runtime.LogIngestionEnabled),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create ingestor: %w", err)
		}

		adminSvc, err := admin.NewAdminService(
			admin.WithRepository(sc.EngineRepository),
			admin.WithRepositoryV1(sc.V1),
			admin.WithMessageQueue(sc.MessageQueue),
			admin.WithMessageQueueV1(sc.MessageQueueV1),
			admin.WithEntitlementsRepository(sc.EntitlementRepository),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create admin service: %w", err)
		}

		adminv1Svc, err := adminv1.NewAdminService(
			adminv1.WithRepository(sc.V1),
			adminv1.WithMessageQueue(sc.MessageQueueV1),
			adminv1.WithEntitlementsRepository(sc.EntitlementRepository),
			adminv1.WithAnalytics(sc.Analytics),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create admin service (v1): %w", err)
		}

		grpcOpts := []grpc.ServerOpt{
			grpc.WithConfig(sc),
			grpc.WithIngestor(ei),
			grpc.WithDispatcher(d),
			grpc.WithDispatcherV1(dv1),
			grpc.WithAdmin(adminSvc),
			grpc.WithAdminV1(adminv1Svc),
			grpc.WithLogger(sc.Logger),
			grpc.WithAlerter(sc.Alerter),
			grpc.WithTLSConfig(sc.TLSConfig),
			grpc.WithPort(sc.Runtime.GRPCPort),
			grpc.WithBindAddress(sc.Runtime.GRPCBindAddress),
		}

		if sc.Runtime.GRPCInsecure {
			grpcOpts = append(grpcOpts, grpc.WithInsecure())
		}

		// create the grpc server
		s, err := grpc.NewServer(
			grpcOpts...,
		)
		if err != nil {
			return nil, fmt.Errorf("could not create grpc server: %w", err)
		}

		grpcServerCleanup, err := s.Start()
		if err != nil {
			return nil, fmt.Errorf("could not start grpc server: %w", err)
		}

		cleanup := func() error {
			g := new(errgroup.Group)

			g.Go(func() error {
				err := dispatcherCleanup()
				if err != nil {
					return fmt.Errorf("failed to cleanup dispatcher: %w", err)
				}

				cacheInstance.Stop()
				return nil
			})

			g.Go(func() error {
				err := grpcServerCleanup()
				if err != nil {
					return fmt.Errorf("failed to cleanup GRPC server: %w", err)
				}
				return nil
			})

			if err := g.Wait(); err != nil {
				return fmt.Errorf("could not teardown grpc dispatcher: %w", err)
			}

			return nil
		}

		teardown = append(teardown, Teardown{
			Name: "grpc",
			Fn:   cleanup,
		})
	}

	if sc.HasService("all") || sc.HasService("scheduler") {
		partitionCleanup, err := p.StartSchedulerPartition(ctx)

		if err != nil {
			return nil, fmt.Errorf("could not create create scheduler partition: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "scheduler partition",
			Fn:   partitionCleanup,
		})

		// create the dispatcher
		s, err := scheduler.New(
			scheduler.WithAlerter(sc.Alerter),
			scheduler.WithMessageQueue(sc.MessageQueue),
			scheduler.WithRepository(sc.EngineRepository),
			scheduler.WithLogger(sc.Logger),
			scheduler.WithPartition(p),
			scheduler.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			scheduler.WithSchedulerPool(sc.SchedulingPool),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create dispatcher: %w", err)
		}

		cleanup, err := s.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start dispatcher: %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "scheduler",
			Fn:   cleanup,
		})

		sv1, err := schedulerv1.New(
			schedulerv1.WithAlerter(sc.Alerter),
			schedulerv1.WithMessageQueue(sc.MessageQueueV1),
			schedulerv1.WithRepository(sc.EngineRepository),
			schedulerv1.WithV2Repository(sc.V1),
			schedulerv1.WithLogger(sc.Logger),
			schedulerv1.WithPartition(p),
			schedulerv1.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue),
			schedulerv1.WithSchedulerPool(sc.SchedulingPoolV1),
		)

		if err != nil {
			return nil, fmt.Errorf("could not create scheduler (v1): %w", err)
		}

		cleanup, err = sv1.Start()

		if err != nil {
			return nil, fmt.Errorf("could not start scheduler (v1): %w", err)
		}

		teardown = append(teardown, Teardown{
			Name: "schedulerv1",
			Fn:   cleanup,
		})
	}

	teardown = append(teardown, Teardown{
		Name: "telemetry",
		Fn: func() error {
			return shutdown(ctx)
		},
	})

	l.Debug().Msgf("engine has started")

	<-ctx.Done()

	if healthProbes {
		h.SetShuttingDown(true)
	}

	return teardown, nil
}

func startPrometheus(l *zerolog.Logger, c shared.PrometheusConfigFile) Teardown {
	mux := http.NewServeMux()
	mux.Handle(c.Path, promhttp.Handler())

	srv := &http.Server{
		Addr:    c.Address,
		Handler: mux,
	}

	go func() {
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			l.Error().Err(err).Msg("failed to start prometheus server")
		}
	}()

	l.Info().Msgf("Prometheus server started on %s", c.Address)

	return Teardown{
		Name: "prometheus",
		Fn: func() error {
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()

			if err := srv.Shutdown(ctx); err != nil {
				return fmt.Errorf("failed to shutdown prometheus server: %w", err)
			}

			l.Info().Msg("Prometheus server shutdown gracefully")
			return nil
		},
	}
}

type ControllerName string

const (
	OLAPController ControllerName = "olap"
	TaskController ControllerName = "task"
)

func isControllerActive(pausedControllers map[string]bool, controllerName ControllerName) bool {
	if isPaused, ok := pausedControllers[string(controllerName)]; !ok || !isPaused {
		return true
	}

	return false
}
